前回 Ruby の TCPソケット通信で MySQL の認証を通すやり方を書いたので、その続き。
認証を通せれば、Rubyから SQL を発行して実行できるようになるのでやり方をまとめる。
MySQL には各種処理を実行するためのコマンドというのがあり、SQL の実行は COM_QUERY というコマンドになる。
主要なコマンド一覧
| コマンド | 値 | 渡すもの |
|---|---|---|
| COM_QUIT | 0x01 | なし |
| COM_INIT_DB | 0x02 | database名 |
| COM_QUERY | 0x03 | SQLクエリ文字列 |
| COM_PING | 0x0E | なし |
| COM_BINLOG_DUMP | 0x12 | binlog位置情報 |
| COM_REGISTER_SLAVE | 0x15 | スレーブ情報 |
| COM_STMT_PREPARE | 0x16 | SQLクエリ |
| COM_STMT_EXECUTE | 0x17 | statement_id + パラメータ |
| COM_STMT_CLOSE | 0x19 | statement_id |
コマンド値は 8ビット符号なし整数でパケット送信する。
例えば、接続確認用の PING コマンドの実行と結果の確認はこんな感じで取れる。
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
sequence_id = 0
def read_packet
header = socket.read(4)
packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16)
sequence_id = header[3]unpack1('C')
payload = socket.read(packet_length)
{ packet_length:, payload: }
end
def send_packet(payload)
sequence_id += 1
packet_length = payload.length
header = [packet_length].pack('V')[0..2] + [sequence_id].pack('C')
socket.write(header + payload)
end
payload = [0x0E].pack('C') # PINGコマンド
send_packet(payload)
response = read_packet(payload)
if response[:payload][0].unpack1('C') == 0x00 # 成功
puts 'PONG'
else
puts 'Oops'
end
COM_QUERY コマンドを実行する
SQL を実行させるための COM_QUERY コマンドのペイロードはこれでOK
payload = [0x03].pack('C') + sql.encode('UTF-8')クエリ実行のパケットを送信すると、MySQL でクエリが実行されて結果をパケット受信できるようになる。
実行結果は OK Packet / ERR Packet / Result Set の3パターンあって、構造が違うのでそれぞれパースする必要がある。
どのパターンかはパケットのペイロードの 1 バイト目を見ればわかるようになっている。
- 0x00 -> OK Packet
- 0xFF -> Err Packet
- 0x01~0xFA -> Result Set
- 0xFB -> Local INFILE Request
LOCAL INFILE Request は特殊なので今回は未対応。
OK Packet の場合
結果セットを返さないクエリが成功したときがこれ。
メジャーなところだと、INSERT / UPDATE / DELETE文を実行したときこれが返る。
パケットの構造はこんな感じ。
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | header | 0x00が入る |
| length-encoded integer | affected_rows | |
| length-encoded integer | last_insert_id | |
| 2 bytes integer | status_flags | サーバ状態を表すフラグ |
| 2 bytes integer | warnings | |
| N bytes string | info | NULL終端 |
length-encoded integer は最初の 1 バイトで長さが決まる整数。
最初の 1 バイトごとにこんな感じで長さと値を取る。
- 0x00 から 0xFA(250)
- 最初の 1 バイトがそのまま値になる
- 0xFC
- 後続の 2 バイトが値
- 0xFD
- 後続の 3 バイトが値
- 0xFE
- 後続の 8 バイトが値
- 0xFB
- NULL
def length_encoded_integer(payload, offset)
first_byte = payload[offset].unpack1('C')
case first_byte
when 0..250
{ value: first_byte, bytes_read: 1 }
when 0xFC
value = payload[(offset + 1)..(offset + 2)].unpack1('s<')
{ value: value, bytes_read: 3 }
when 0xFD
value = payload[(offset + 1)..(offset + 3)].unpack1('V') & 0xFFFFFF
{ value: value, bytes_read: 4 }
when 0xFE
value = payload[(offset + 1)..(offset + 8)].unpack1('Q<')
{ value: value, bytes_read: 9 }
else # Included 0xFB
{ value: nil, bytes_read: 1 }
end
endERR Packet の場合
文字通りクエリ実行がエラーになった時がこれ。
パケットの構造はこんな感じ。
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | header | 0xFFが入る |
| 2 bytes integer | error_code | |
| 1 byte integer | sql_state_marker | "#"固定 |
| 5 bytes string | sql_state | |
| N bytes string | error_message | NULL終端 |
Result Set の場合
SELECT みたいな結果行レコードが返るクエリがこれ。
こいつは複雑で、複数のパケットで構成される。
- カラム数パケット
- この後何回カラム定義パケットを取ればいいかを伝える
- カラム定義パケット
- EOFパケット
- カラム定義パケットが終わったことを伝える
- 行データパケット
- EOFパケット
- 行データパケットが終わったことを伝える
カラム数パケット
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | column_count | カラムの数が入る |
カラム定義パケット
| サイズ | フィールド | |
|---|---|---|
| length-encoded string | catalog | "def" というダミー値固定 |
| length-encoded string | schema | データベース名 |
| length-encoded string | table | テーブル名(エイリアス) |
| length-encoded string | org_table | 物理テーブル名 |
| length-encoded string | name | カラム名(エイリアス) |
| length-encoded string | org_name | 物理カラム名 |
| length-encoded integer | length_of_fixed_fields | "0x0c" 固定 |
| 2 bytes integer | charset | |
| 4 bytes integer | column_length | カラムの最大長 |
| 1 byte integer | column_type | データ型 |
| 2 bytes integer | flags | カラムの属性を表すビットフラグ |
| 1 byte integer | decimals | 小数点以下の桁数 |
| 2 bytes integer | filter | "0x00 0x00" 固定 |
length-encoded string は length-encoded integer + length-encoded integer が返す値の長さの文字列。
こんな感じで取る。
def length_encoded_string(payload, offset)
length_info = length_encoded_integer(payload, offset)
return { value: '', bytes_read: length_info[:bytes_read] } if length_info[:value].nil?
string_start = length_info[:bytes_read]
string_end = string_start + length_info[:value] - 1
value = payload[string_start..string_end] || ''
{ value: value, bytes_read: length_info[:bytes_read] + length_info[:value] }
endEOF パケット
| サイズ | フィールド | |
|---|---|---|
| 1 byte integer | eof | "0xFE" 固定 |
行データパケット
カラム数分 length-encoded string が繰り返され、取れる値がカラムの値になる。
ただし、1 バイト目が 0xFB の場合は値が NULL になっているので最初に NULL チェックする。
これらをまとめると、Result Set パケットを処理するコードはこんな感じになる。
# カラム定義パケットのパース
def parse_column_definition(payload)
offset = 0
catalog = length_encoded_string(payload, offset)
offset += catalog[:bytes_read]
schema = length_encoded_string(payload, offset)
offset += schema[:bytes_read]
table = length_encoded_string(payload, offset)
offset += table[:bytes_read]
org_table = length_encoded_string(payload, offset)
offset += org_table[:bytes_read]
name = length_encoded_string(payload, offset)
offset += name[:bytes_read]
org_name = length_encoded_string(payload, offset)
offset += org_name[:bytes_read]
# length of fixed-length fields (1 byte)
offset += 1
charset = payload[offset..(offset + 1)].unpack1('v')
offset += 2
column_length = payload[offset..(offset + 3)].unpack1('V')
offset += 4
column_type = payload[offset].unpack1('C')
{
schema: schema[:value],
table: table[:value],
org_table: org_table[:value],
name: name[:value],
org_name: org_name[:value],
charset: charset,
column_length: column_length,
column_type: column_type
}
end
# 行データパケットのパース
def parse_row_data(payload, columns)
first_byte = payload[0].unpack1('C')
row = {}
offset = 0
columns.each do |column|
column_name_key = column[:name].downcase.to_sym
if offset >= payload.length
row[column_name_key] = nil
next
end
if first_byte == 0xFB
# NULL value
row[column_name_key] = nil
offset += 1
else
# row data (length-encoded string)
value = length_encoded_string(payload, offset)
row[column_name_key] = value[:value]
offset += value[:bytes_read]
end
end
row
end
# カラム数パケット、カラム定義パケットを処理
columns = []
column_count = length_encoded_integer(payload, 0)[:value].to_i
column_count.times do
column_packet = read_packet
column_info = parse_column_definition(column_packet[:payload])
columns << column_info
end
# EOF パケットを読み進める
connection.read_packet
# 行データパケットを処理
rows = []
loop do
row_packet = read_packet
# EOF パケットになったら終了
if row_packet[:payload][0].unpack1('C') == 0xFE
break
end
rows << parse_row_data(row_packet[:payload], columns)
end
puts rows